package com.faner4cloud.yun.common.util.delayqueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import java.util.Iterator;
import java.util.concurrent.*;

/**
 * @author ...zz
 * @version v1
 * @summary 延迟worker
 * @since 2022/8/2 5:14 PM
 */
public abstract class DelayWorker<T> {

	private final static Logger logger = LoggerFactory.getLogger(DelayWorker.class);

	@Value("${spring.profiles.active}")
	private String active;

	/**
	 * 延迟队列
	 */
	private DelayQueue<DelayItem<T>> queue;

	/**
	 * 工作线程池
	 * 从延迟队列中获取超时任务，提交给线程池处理
	 */
	private Thread worker;

	/**
	 *
	 */
	private ExecutorService executor;

	/**
	 * 默认参数
	 */
	private int coreThread = 3;
	private int maxThread = 10;
	private int executorCapacity = 1024;

	/**
	 * 任务超时后的处理方法
	 * 根据业务逻辑实现，需要考虑同步控制，事务控制，异常处理等
	 *
	 * @param item 队列元素
	 */
	protected abstract void handleDelayed(DelayItem<T> item);

	/**
	 * 初始化，需要在实现类中初始化时调用
	 * 初始化队列、工作线程、线程池
	 */
	protected void init(){
		if (coreThread <= 0){
			coreThread = 1;
		}
		if (maxThread <= coreThread){
			maxThread = coreThread * 2;
		}
		if (executorCapacity <= 1024){
			executorCapacity = 1024;
		}
		logger.info("延迟worker初始化开始...");
		queue = new DelayQueue<>();
		executor = new ThreadPoolExecutor(coreThread, maxThread, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(executorCapacity));
		worker = new Thread(this::work);
		worker.setName("延迟worker线程");
		worker.start();
		logger.info("延迟worker初始化完成...");
	}

	/**
	 * 循环获取超时的任务，提交线程池处理
	 */
	private void work(){
		logger.info("延迟worker开始工作...");
		while (true){
			try {
				DelayItem<T> item = queue.take();
//                logger.info("获取延迟任务：data = " + item.getData().toString());
				// 提交线程池处理
				executor.submit(() -> this.handleDelayed(item));
			}catch (InterruptedException e){
				logger.error("从延迟队列获取任务失败, message : " + e);
			}
		}
	}

	/**
	 * 把任务放入延迟队列
	 *
	 * @param data 任务
	 * @param activeTime 存活时间
	 * @param unit 单位
	 * @param ctime 任务创建时间戳，毫秒
	 */
	public void put(T data, long activeTime, TimeUnit unit, long ctime){
//        logger.info("加入延迟队列:data = " + data.toString());
		long expire = ctime + TimeUnit.MILLISECONDS.convert(activeTime, unit);
		queue.put(new DelayItem<>(data, expire));
	}

	/**
	 * 把任务从队列中移除
	 */
	public void remove(T data){
//        logger.info("移出延迟队列:data = " + data.toString());
		queue.remove(new DelayItem<>(data, 0));
	}

	protected void setCoreThread(int coreThread) {
		this.coreThread = coreThread;
	}

	protected void setMaxThread(int maxThread) {
		this.maxThread = maxThread;
	}

	protected void setExecutorCapacity(int executorCapacity) {
		this.executorCapacity = executorCapacity;
	}

	protected int getQueueSize(){
		return queue.size();
	}

	protected Iterator<DelayItem<T>> iterator(){
		return queue.iterator();
	}

}
